[WIP] Orchestration + GCP modernization: composition refactor, uv migration, OGC Features#69
Merged
Merged
Conversation
Pre production
Pre production
Pre production
Pre production
Pre production
Pre production
Pre production
Added specific conductance and micro siemens per centimeter to the constants file to support new functionality in the data integration engine.
Getting more than 10 at a time often causes HTTP timeout errors. Decreasing chunk size to 10 seems to fix the issue.
This update adds specific conductance as a parameter in the configuration, with the output unit of microsiemens per centimeter (uS/cm). It also updates the transformer to handle conversions for specific conductance, and adds the appropriate mapping for this parameter in the connectors.
Update specific conductance to use the correct mappings and remove BOR from specific conductance agencies. BOR sites in NM report conductivity, not specific conductance (in a lab at 25 deg C)
To be specific conductance it needs to be measured at 25C, otherwise it is conductivity
Previously if the tag parameter was set to Null or was not provided it would default to "data". This caused issues for some endpoints that did not have a "data" key in their response or for which the full object was desired.
BoR does not report bicarbonate data
The USGS deprecated the old water level retrieval API endpoints, so this commit updates the NWISWaterLevelSource to use the new OGC API endpoints. Additionally, the chunk size for site retrieval is reduced to 5 to avoid URI length issues and httpx read timeouts.
this preserves the original traceback, which is helpful for debugging.
utf-8 encoding fix & persister error logging
T1: migrate to uv/hatchling — drop setup.py/requirements.txt/pytest.ini/mypy.ini, add pyproject.toml + uv.lock; fix --no-* CLI flag polarity bug (§B.1 §V-1) T2: add OGC Features persister (summary + flat timeseries) + 9 tests (§V-4 §V-5 §V-6 §V-7) T3: orchestration scaffold — pyproject.toml, products.yaml, DIEConfigResource (§V-2 §V-3) T4: Dagster assets for analytes + definitions.py (4 assets + 4 schedules) (§V-8) T5: water-level timeseries assets (flat one-feature-per-observation) (§V-5 §V-6) T6: GCS resource with atomic latest.geojson write (§V-9) T7: Cloud Run Job Dockerfile + cloudbuild.yaml + cloudrun.yaml + README (§V-3) T8: update CI/CD for uv; add orchestration-ci.yml workflow (§V-1) T9: pygeoapi — config.yml.j2, generate_config.py, Dockerfile, cloudbuild.yaml (§V-10) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ity/observability/readability/composition tasks
§V: No class inherits Loggable (§T.10) §V: Polygon cache keyed by WKT at class level, not instance (§T.17)
§V: print() MUST NOT appear in backend/ core code
§V: retry backoff = min(2**n, 60) §V: HTTP attempts log source/url/status/attempt/elapsed_ms
…xception §V: no bare except Exception — catch specific types §V: coordinate range validated before transform
§V: persister selection logic in make_persister(), not in Unifier
…ject dependencies
- T11: STSource mixin → STClient composition; single inheritance throughout
- T12: 5 ST2 subclasses → functools.partial configured instances
- T13: CloudStoragePersister → OutputStrategy protocol (LocalFileStrategy, GCSStrategy)
- T14: transformer_klass class attr → transformer= constructor injection; BaseTransformer default
- T15: WaterLevelRecord/AnalyteRecord/WaterLevelSummaryRecord/AnalyteSummaryRecord deleted;
record_type field replaces isinstance() checks
- T18: BaseParameterSource.read() extracts _summarize_records/_build_timeseries_records;
fixes double do_transform() call in timeseries path
- T21: BaseParameterSource 477→133 lines; RecordValidator + subclasses extracted;
RecordSummarizer extracted; read()→read_summary()+read_timeseries(); bookend→position
- T22: do_transform() 191-line monolith → orchestrator + 6 focused helpers
(_apply_geographic_filter, _standardize_datetime, _apply_datum_transform,
_apply_elevation_transform, _apply_well_depth_transform, _apply_unit_conversion)
- T23: get_config_and_false_agencies() 107-line if/elif → PARAMETER_SOURCE_MAP lookup;
_build_source_pair() DRYs analyte_sources()/water_level_sources()
- T24: BaseSource.__init__ accepts http_client=; all httpx.get() → self._http_client.get()
- T25: backend/converter.py — UnitConverter protocol + StandardUnitConverter;
BaseTransformer.converter injectable; convert_units() kept as deprecated wrapper
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Make the repo show up in Dagster+ as a serverless code location. - dagster_cloud.yaml: location die-orchestration, module orchestration.definitions, build dir orchestration (its pyproject carries the dagster deps + nmuwd path-source). - GitHub Actions: prod deploy on push to main + per-PR branch deployments, PEX fast deploy (no system GDAL needed; only shapely, which ships a bundled-GEOS wheel). - orchestration/deploy_serverless.sh: manual deploy via dagster-cloud serverless deploy-python-executable, same build definition as CI. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The build job failed at the flake8 step (linting .venv site-packages),
which masked mypy errors the refactor introduced. Fix the whole gate:
- .flake8: exclude .venv + output dirs (extend-exclude keeps defaults).
- logger.py: drop dead `global _managed_handlers`, annotate the list.
- mypy fixes (0 errors, was 33):
- source.py: Optional defaults for validator/transformer, widen
_extract_terminal_record position to str, annotate RecordValidator
.config, cast site records in _transform_sites, cast read() returns
(List is invariant).
- record.py: declare BaseRecord.chunk_size.
- persister.py: add_extension extension arg is str (callers pass .value).
- transformer.py: coerce convert() input to float.
- geo_utils.py / geoserver.py / factory.py / st_connector.py: type
annotations for module globals, declarative Base, optional import,
and Optional url attrs.
- pyproject.toml: exclude tests/archived from mypy (already excluded
from pytest; references an old CLI API).
flake8 + mypy + pytest (427 tests) all green locally with the CI env
(uv sync --extra dev).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Verified via cross-reference (grep + framework-entry-point check) before removal; kept Flask/FastAPI route handlers, click commands, and inherited base-class slots that vulture flags as "unused". Removed: - Dead functions/classes: bounding_polygons get_* helpers (6), geo_utils utm/lonlat converters (+ orphaned PROJECTIONS), converter UnitConverter Protocol, logger Loggable, strategies OutputStrategy, source BaseContainerSource/BaseFileSource, transformer convert_units (deprecated), unifier get_sources_in_polygon/generate_site_bounds/ *_unification_test/get_datastream(s) + stale commented call block. - orchestration/assets/wells.py (build_wells_asset, imported nowhere). - 63 unused imports across backend/frontend/orchestration (autoflake). flake8 (E9,F63,F7,F82) + mypy clean; 427 tests collect with no import errors; local persister tests pass. ckan connector appears entirely unwired (config uses st2 instead) — flagged separately, not removed here. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…-serverless Add Dagster+ serverless code location + deploy paths
The serverless deploy failed hard (Invalid URL '/graphql') whenever the DAGSTER_CLOUD_URL / DAGSTER_CLOUD_API_TOKEN secrets are unset, blocking every PR. Add a guard step so the deploy is skipped (green) when the secrets are absent, and only runs once they are configured. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
pyproject.tomlreplacessetup.py/requirements.txt/pytest.ini/mypy.ini; CI usesuv runbackend/persisters/ogc_features.py—dump_summary_collection+dump_timeseries_collectionorchestration/— Dagster assets, GCS resource, Cloud Run Dockerfile, pygeoapi configbackend/Key refactoring changes (T10–T26)
Loggablebase →make_logger()factory injectionSTSourcemixin →STClientcomposition, single inheritancefunctools.partialconfigured instancesCloudStoragePersister→OutputStrategyprotocol (LocalFileStrategy,GCSStrategy)transformer_klassclass attr →transformer=constructor injectionWaterLevelRecord/AnalyteRecordsubclasses deleted;record_typefield replacesisinstance()min(2^n, 60)s; structured HTTP request logging_polygon_cache)_summarize_records/_build_timeseries_recordsextracted; fixes doubledo_transform()callprint()→ structured loggingBaseParameterSource477→133 lines;RecordValidator+RecordSummarizerextracted;bookend→positiondo_transform()191-line monolith → orchestrator + 6 focused helpersPARAMETER_SOURCE_MAPreplaces 107-line if/elif chain;_build_source_pair()http_client=injectable intoBaseSource; no directhttpx.get()callsUnitConverterprotocol +StandardUnitConverter; injectable intoBaseTransformerPersisterFactoryextracted fromUnifierTest plan
uv run pytest tests/test_persisters/ tests/test_cli/ -q— 9 passeduv sync --extra devsucceeds from lockfileuv run die weave --parameter waterlevels --county bernalillo --output-summary(requires live network +frost_sta_client)uv run dagster asset materialize -f orchestration/definitions.py --select nm_waterlevels_summary(requires GCP credentials)🤖 Generated with Claude Code